[SPARK-57000][CORE][SS][RTM] Add concurrent scheduling capabilites for Real-time Mode#56055
[SPARK-57000][CORE][SS][RTM] Add concurrent scheduling capabilites for Real-time Mode#56055jerrypeng wants to merge 5 commits into
Conversation
…treaming Ports the ConcurrentStageDAGScheduler from the Databricks runtime so that streaming queries can opt in to a "real-time" execution mode that runs all stages of a job concurrently rather than sequentially. When enabled via spark.scheduler.dagSchedulerType=ConcurrentStageDAGScheduler and the per-job streaming.concurrent.stages.enabled property, the scheduler: - Marks all ancestor stages of the final stage as concurrent on job submission and validates that the cluster has enough free slots (CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT), gated by spark.scheduler.realtimeModeSlotsCheck.disabled. - Submits child stages while parents are still running, delays task completion events for a child whose parent is still running, and replays the delayed events when the parent finishes. - Rejects speculative execution. DAGScheduler changes (no-op for the default scheduler): - New protected onFinalStageCreated hook, invoked from handleJobSubmitted / handleMapStageSubmitted right after final stage creation. - New protected submitConcurrentStage and postSchedulerEvent helpers. - New package-visible isRunningStage and getStage accessors. - submitStage and markStageAsFinished relaxed from private to protected so subclasses can override them. DAGSchedulerSuite refactor: - Renames the concrete suite to abstract DAGSchedulerSuiteBase and adds an empty class DAGSchedulerSuite extends DAGSchedulerSuiteBase to preserve the existing entry point. - Extracts a TestDAGScheduler trait carrying the scheduleShuffleMergeFinalize and handleTaskCompletion overrides; MyDAGScheduler mixes the trait in. - Adds a protected createInitialScheduler hook used by init(). - Loosens submit, completeShuffleMapStageSuccessfully, completeNextResultStageWithSuccess, and assertDataStructuresEmpty to protected so subclass suites can use them. Integration: - SparkContext picks the scheduler implementation based on spark.scheduler.dagSchedulerType. - TaskSchedulerImpl uses maxFailures=1 for concurrent-stage TaskSets so a failure restarts the streaming query instead of being silently retried. - TaskSetManager counts ExecutorLostFailure toward task failures and skips the "executor lost is not the task's fault" exemption in concurrent mode. Adds the supporting LogKeys (PARENT_STAGE, STREAMING_QUERY_ID) and the CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT error class. Deviations from the runtime source kept to the minimum necessary to compile in OSS: - Extends DAGScheduler directly (runtime extends CrossJobDepDAGScheduler, which gates micro-batch pipelining; not part of OSS). - Hook is named onFinalStageCreated rather than the runtime's populateCrossJobDepInfo, since CrossJobDepDAGScheduler is not part of OSS. - Micro-batch pipelining co-existence check (and its test) dropped, since MBP is not part of OSS. - getStreamingBatchIdFromProperties and StreamingBatchId live in the companion object instead of CrossJobDepDAGScheduler. - Slot check uses sc.schedulerBackend.defaultParallelism() in place of the runtime's TaskSchedulerStats helper. - DatabricksEdgeConfigs.serverlessEnabled gating removed; the spark.scheduler.realtimeModeSlotsCheck.disabled config is the sole knob. - isConcurrentStagesEnabled tolerates null Properties (OSS TaskSet allows null in tests). Co-authored-by: Isaac
3d0058f to
e2a204b
Compare
| val totalSlots = sc.schedulerBackend.defaultParallelism() | ||
| val coresInUse = runningStages.toArray.map(totalNumCoreForStage(_)).sum | ||
| if (totalSlots - coresInUse < totalCoresNeeded) { | ||
| throw new SparkRuntimeException( |
There was a problem hiding this comment.
When this throws, the stages added to concurrentStages above are leaked — handleJobSubmitted catches the exception and fails the job, but nothing ever clears those entries. A subsequent job whose stages share IDs (e.g. retries from the same RDDChain) would inherit them. Either clear concurrentStages of the stages just visited before throwing, or capture them in a local set and only commit to concurrentStages once the slot check passes.
There was a problem hiding this comment.
fixed by accumulating into a local visitedStages set during the DAG walk and only committing to concurrentStages after the slot check passes
There was a problem hiding this comment.
Though the actual affect of this would likely be small since this would only occur on query failure.
| } | ||
|
|
||
| // This is overridden to handle any delayed task completion events for dependent stages. | ||
| override def markStageAsFinished( |
There was a problem hiding this comment.
The dependentStageMap cleanup path only fires when a stage in the map is named as a parent via markStageAsFinished(parent). If a dependent stage itself aborts mid-job (e.g. its single allowed failure under maxTaskFailures=1), its own entry — including any buffered delayedTaskCompletionEvents — is never removed from dependentStageMap. With concurrent jobs sharing a long-lived scheduler instance, that's a slow leak across queries. Consider clearing the entry for stage itself inside markStageAsFinished (especially when errorMessage.isDefined).
There was a problem hiding this comment.
I will just remove the stage's own entry at the end of markStageAsFinished
| STREAMING_DATA_SOURCE_NAME, | ||
| STREAMING_OFFSETS_END, | ||
| STREAMING_OFFSETS_START, | ||
| STREAMING_QUERY_ID, |
There was a problem hiding this comment.
QUERY_ID already exists and is what StructuredStreamingIdAwareSchedulerLogging uses to log streaming query IDs. Adding STREAMING_QUERY_ID creates a parallel key for the same concept. Suggest dropping this addition and using LogKeys.QUERY_ID at all the callsites, or update the callsites in StructuredStreamingIdAwareSchedulerLogging.
There was a problem hiding this comment.
A query id and streaming query id are typically not the same. query id for a batch query is simply a transient id for a batch. The streaming query id is persistent for the entirety of the streaming query execution.
I would keep it here and fix it in StructuredStreamingIdAwareSchedulerLogging
Fixes: - DAGScheduler.submitConcurrentStage: change `new IllegalStateException(...)` to `throw new IllegalStateException(...)` so the unexpected-state branch actually fails instead of silently being a no-op. - ConcurrentStageDAGScheduler.onFinalStageCreated: walk the DAG into a local `visitedStages` set and only commit to `concurrentStages` after the slot check passes, so a slot-check failure can't leak stage references into the long-lived scheduler state. - ConcurrentStageDAGScheduler.markStageAsFinished: unconditionally drop the stage's own entry from `dependentStageMap` at the end. On the success path the entry has already been removed by `checkDependentStageTasks`, so this is a no-op; on failure/cancellation/abort it's the missing cleanup that previously required the parent stage to be marked finished (which doesn't always happen if the parent is shared with another job). - ConcurrentStageDAGScheduler.onFinalStageCreated: speculation check also reads `sc.conf.get(SPECULATION_ENABLED)`, matching how the rest of core reads the config; users with cluster-wide spark.speculation=true were previously bypassing this guard. API cleanup: - Move `submitConcurrentStage` into ConcurrentStageDAGScheduler as a private method. Remove `postSchedulerEvent` entirely (callers now use `eventProcessLoop.post(event)` directly since it's already `private[spark]`). Relax `submitMissingTasks` and `activeJobForStage` to `protected` so the subclass can call them. - Reuse `StructuredStreamingIdAwareSchedulerLogging.QUERY_ID_KEY` and `BATCH_ID_KEY` constants instead of hardcoded strings; drop the unused `runId` field from `StreamingBatchId` (CrossJobDepDAGScheduler — which consumes it — is not part of this PR). Test scaffolding: - Add `protected def extraEmptyChecks(): Unit = ()` hook to `assertDataStructuresEmpty` in DAGSchedulerSuiteBase; override in ConcurrentStageDAGSchedulerSuite to assert `concurrentStages` and `dependentStageMap` are empty. - Also call `extraEmptyChecks()` in `afterEach`, so every inherited test (and every locally-defined test) automatically validates that the new state hasn't leaked. Pattern-match on the scheduler type to skip the check when an inherited test replaces the scheduler with a plain MyDAGScheduler. - Relax `failed` and `cancel` to `protected` in DAGSchedulerSuiteBase so subclass suites can use them. New tests (in ConcurrentStageDAGSchedulerSuite): - `concurrentStages is empty after slot-check failure` — exercises the visited-set commit pattern. - `dependentStageMap entry is cleaned up when a dependent stage aborts and its parent stage is shared with another job` — sets up a shared shuffle stage between a batch and a concurrent job; fails the concurrent job's leaf and verifies the cleanup runs even though the parent isn't marked finished. - `concurrentStages and dependentStageMap are cleaned up after job cancellation` — covers the JobCancelled event path. - `concurrentStages and dependentStageMap are cleaned up after executor-loss induced abort` — covers the maxFailures=1-abort path. - Speculation test split into per-job-property and cluster-wide-SparkConf variants; both verified to fail the job. Typos and wording: - Comment "states" → "stages" in ConcurrentStageDAGScheduler. - "has only has" → "has only" in the CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT error message. - "contribute the task failures" → "count toward the task failures" in TaskSetManager. - Test comment "4 tasks in stage C" → "4 tasks in stage D" in the complex- pipeline test. Co-authored-by: Isaac
|
@jiangxb1987 thank you for the review! I have addressed your comments and added additional tests. PTAL. |
| // enqueues any saved task completion event (if any). | ||
| private def checkDependentStageTasks(stage: Stage): Unit = { | ||
| val dependentStageInfo = dependentStageMap.getOrElse( | ||
| stage, throw new RuntimeException(s"Stage $stage is not in dependentStageMap") |
There was a problem hiding this comment.
This should be IllegalStateException instead of RuntimeException?
|
Thanks @mridulm for the question, and for your interest in this work! The short answer is that barrier execution mode and concurrent stage scheduling solve orthogonal problems. As I understand it, barrier mode is gang scheduling for the tasks within a single stage: it launches all N tasks of that stage simultaneously, and the tasks can then coordinate with each other mid-execution via barrier() / allGather() (MPI-style). What real-time mode needs is different — the ability to schedule multiple stages of a job to run concurrently (which is what this PR focuses on), so records can stream from upstream stages to downstream stages through a streaming shuffle. There's no hard requirement for all tasks to coordinate, or to be co-scheduled, before the query starts. Your question — whether RTM could benefit from gang scheduling — is a fair one. I think the answer is "maybe, but not strictly necessary." The streaming shuffle implements a backpressure mechanism that serves a similar purpose: if a downstream consumer isn't ready yet, the upstream producer backs off rather than failing, thus a coordinate execution system like barrier scheduling is not needed |
|
From a scheduler perspective, it appears that the primitives required to make this work already exists - whether it gets leveraged for mpi or real time mode is an implementation detail. Having said that, if we are robust to not requiring all stages/tasks to be running before execution (which was my previous understanding as per proposal - perhaps I misunderstood) - what is the gap w.r.t launching a bunch of long running map stages ? Essentially, I am trying to understand why this needs to be in the scheduler - versus in integration code To put it differently, I am trying to make sure we have robustly considered alternatives - and I want to understand their tradeoffs; they are not clear to me |
|
@mridulm — This change is not needed when the streaming query is a single stage: a single long-running (map) stage runs fine on the existing scheduler, which is exactly why RTM support for single-stage stateless queries already shipped in 4.1 — no scheduler change required there. However, multi-stage queries (e.g. stateful queries) are today executed one stage at a time, with each stage's shuffle output fully materialized before the next stage starts. To reach millisecond-level latencies, we instead need the stages of a single query to run concurrently, connected by the streaming shuffle (currently being merged in incrementally). Enabling concurrent execution of dependent stages within a single job is what requires the scheduler change — which is why this work is needed. |
|
@jerrypeng you can launch N number of map stages - and wire them to talk to each other for multi stage queries ? |
|
@mridulm we can do that, but that is probably not the most eloquent or simplest solution. A multi-stage query isn't a list of independent map stages — it's a connected stage DAG the planner already produces: shuffle-map stages at each exchange, a result stage at the sink, real shuffle-dependency edges between them, and branching where it exists (e.g. a join stage reads two shuffle inputs). To emulate that as "N map stages wired together" we'd have to re-cut the plan into separate jobs, pre-mint and inject shuffleIds to recreate the dependency edges, and re-implement cross-job failure/cancellation/completion so the micro-batch still behaves as one unit — i.e. re-derive the DAGScheduler's own stage decomposition and job coordination in streaming code, to fake a DAG we already have natively. So instead of faking it, we keep the real plan and its real stages, and change the only thing that's actually different in real-time mode: when the stages run. Normally a stage waits for its parent to finish; here the stages run at the same time, connected by the streaming shuffle. That's a scheduling decision about an existing DAG, which is why it belongs in the scheduler. |
|
@jerrypeng Everything you describe cam be entirely in the integration layer - instead of at the scheduler. We are letting the implementation details for a usecase define what the scheduler should look like, and making surgical changes to adapt to it - instead of defining what the semantics need to be (perhaps this has been done : but it is unclear from this PR anyway). |
Can you elaborate?
Sorry to hear that! Can you help me understand better what is not clear to you? |
For example, this integration could be modeled as I described above - submitting map stages with streaming shuffle wired up between the stages.
The proposed scheduler is codifying expectations specific to this implementation - and not generic constructs. In other words, I want to make sure we make scheduler changes only when required, where the behavior is not implementation details in service of an initiative, and constructs help unlock a larger class of usecases. |
|
@mridulm thank you for clarifying! Some background may help. Real-time mode is a new execution mode we're introducing in Structured Streaming that lets streaming queries process data with end-to-end latencies in the milliseconds. Reaching that requires a few changes to how queries execute; scheduling is one of them, and that's what this PR covers. For a query with multiple stages to hit millisecond latencies, the cluster has to run the tasks of all stages at the same time, with adjacent stages connected by a streaming shuffle (implemented in a separate set of PRs). That lets data flow continuously through the query DAG instead of one stage at a time — and processing one stage at a time is a core reason the current model can't reach these latencies. On the changes in this PR: the change to the existing DAG scheduler is small and additive — a no-op hook, a couple of accessors, and a few visibility relaxations — and the default behavior is unchanged. The new capability lives in a separate class ( On the semantics you asked about: the new capability is that the stages of a DAG can run concurrently rather than one at a time. Today the DAG scheduler treats a data dependency between two stages as a reason to run them sequentially — the child waits for the parent. But that sequencing isn't inherent to the dependency; it's a consequence of the shuffle being materialized, where the consumer can't read anything until the producer has written its complete output. With a streaming shuffle that the consumer reads incrementally, the producer and consumer stages can run at the same time. So the semantic this introduces is: a directional data dependency constrains the ordering of data, not necessarily the concurrency of execution — and when the connecting shuffle supports incremental reads, dependent stages may execute concurrently. I hope that clarifies it. This is a general scheduling capability — concurrent execution of data-dependent stages over an incrementally-readable shuffle — and real-time mode is simply its first consumer. I think the question next should be how to natively integrate this into the DAGScheduler so users don't need to specify to configure to use the ConcurrentDAGscheduler to get this capability. That is what I will be working on next. |
|
@mridulm do you have any additional concerns I can address? |
|
I am not in favor of merging this PR. Strawman proposal - extend support for realtime shuffle as a first class concept within DAGScheduler. Currently we have:
With semantics around how to handle failures, etc. Extend this to support real time shuffle as a first class support, and define : A lot of these are already in the current PR - we need to simply formalize them, and integrate into existing machinary. |
|
@mridulm thank you for the detailed feedback — I think we're aligned on the destination, and I'd like to propose reaching it incrementally. I agree the end state is: these scheduling semantics supported by the default DAGScheduler, with richer, more fine-grained abstractions — e.g. annotating in the query plan which shuffles can be read incrementally, rather than an opt-in flag. My question is whether we can sequence it into milestones rather than land it all at once. IMO this PR already declares clear semantics for the new scheduling capability, and they're fairly generic:
None of these reference streaming — real-time mode is just the first caller, and the capability isn't streaming-specific: any feature that uses an incrementally-readable shuffle can opt into the same semantics. The PR gates them behind a streaming-named property for expedience ( I'd also note the DAGScheduler footprint is deliberately small: the base-class change is a no-op hook, a couple of accessors, and a few visibility relaxations, with the default execution path unchanged — precisely because I share your concern that changes there are high-risk. The new behavior is fully opt-in, so structuring it this way keeps the blast radius small: unrelated queries can't be affected by these changes. That's also what makes landing it incrementally low-risk. Could we use this PR as the first milestone and merge it as-is? It would let us test and validate real-time mode end-to-end in-tree while we design the deeper integration. As an immediate follow-up, I will work through how to make these semantics more natively defined in the DAGScheduler. What do you think? |
|
@mridulm To give a sense of what I'm picturing for the more native design: It's set during physical planning (this is an execution concern, not a logical-plan one), flows into the ShuffleDependency the exchange creates, and is read by the DAGScheduler at stage-creation time. That single flag is the entry point that opts a shuffle edge into concurrent scheduling over an incremental shuffle (e.g. the "streaming shuffle" we're building for RTM). How RTM sets it. A physical-planning rule for RTM streaming queries marks the query's shuffle exchanges with incrementalHint = true. Nothing here is streaming-specific: any feature can write its own physical-planning rule to opt a job into concurrent scheduling + incremental shuffles, so the capability is generic — RTM is just the first caller. Semantics implied by an incremental shuffle:
A cleaner generalization — separate "incremental" from "persistent". Semantic (2) is really a consequence of a second, orthogonal property: whether the shuffle data is durable/replayable. We can make that explicit with a second flag: That decouples two concerns:
Though I would defer implementing the "persistentHint" capability until there is an actual use case / implementation. The streaming shuffle is incremental and transient. But a Kafka-backed shuffle, say, could be incremental and persistent — concurrent scheduling without the rerun-everything-on-failure penalty, since the consumer can replay from an offset. Splitting the two flags lets the construct compose across those cases instead of hardcoding RTM's failure model. Pluggable shuffle implementation via config. The engine maps the capability to a concrete ShuffleManager. Today we dispatch between sort and streaming managers on a per-job property; the cleaner version selects per-dependency from incrementalHint, with the implementations configured: A shuffle with incrementalHint = true is served by the configured incremental manager; everything else by the default — keeping the scheduler construct generic while the shuffle implementation stays pluggable. Let me know what you think though I would still prefer to do this incrementally like my previous comment. |
|
Let me rephrase it - why do we want to add this workaround ? And not integrate it into DAGScheduler ? Any scheduler change suffers from potential risks - and I am trying to reason about why we are special casing here and introducing an entirely new dag scheduler. If we want to make dag scheduler pluggable - that is a design in itself, and needs to be thought through - not as a derisking mechanism for a specific feature. |
|
@mridulm the implementation presented in this PR is not really a workaround — it is a working solution for the needs of RTM. I agree that we can eventually design a more natively integrated solution that provides more generic functionality. However, can we approach that incrementally? My philosophy for software development is iterative. I would like to first introduce something that works for the RTM use case, while minimizing risk to existing Spark use cases. That is what this PR is trying to do. The changes are intentionally scoped so that we can test RTM end-to-end without requiring a larger DAGScheduler redesign upfront. I would rather get something working first, validate it end-to-end, and then iteratively refine the abstractions. Building a more generic framework may take time, and I am happy to work toward that, but I do not think we should block RTM progress on having the fully generalized design in place from day one. Let me know what you think. Regardless I am going to look into how to natively built this functionality in the DAGScheduler. |
|
As I said before, I would suggest working towards integrating into If we want to make I am -1 on this specific direction for the change until then. +CC @jiangxb1987 as you reviewed this PR as well. |
|
@mridulm what do you think about the approach I described here: |
|
Joining late — I've read the thread and I'm with @mridulm on direction: this should be a principled construct inside The new ability is cross-stage gang scheduling, and that already implies a streaming shuffle. Co-scheduling two data-dependent stages is only useful if the edge is readable before the producer finishes — i.e. a streaming shuffle. So "run these stages concurrently" and "the shuffle is incremental" aren't orthogonal; they're one decision seen from two sides. The One nuance on "barrier": what's barrier-like is the resource side — the co-scheduled stages must all get slots at once or fail fast, which is exactly what this PR's Concretely, I'd express @mridulm's (a)–(d) as one first-class dependency kind rather than two hint flags: a pipelined shuffle dependency, peer to narrow/shuffle deps, set in physical planning and carried into the Add: the marker on Remove: On out-of-order completion: with stages concurrent, the result stage's tasks can finish while parents run, and the base Sequencing: this still gets us there incrementally, just cut differently — land the dependency type + group scheduling + group failure as the generic milestone with a non-streaming test, then add the RTM rule + streaming shuffle on top. The scheduler primitive is the hard-to-revise part, so I'd rather get its shape right in-tree first with RTM as the validating consumer than merge the subclass and re-cut it later. Happy to help review the dependency-type design. |
What changes were proposed in this pull request?
This PR introduces
ConcurrentStageDAGScheduler, the scheduler needed to power real-time mode for Structured Streaming.In real-time mode, a streaming query continuously produces output with end-to-end latency on the order of tens of milliseconds — far below the latency floor of traditional micro-batch
execution. To get there, the query has to abandon the "run stage N, materialize its shuffle output, then run stage N+1" model that the default
DAGSchedulerenforces. Instead, everystage of the query must run at the same time, with records flowing from upstream tasks to downstream tasks through a streaming shuffle as they're produced.
ConcurrentStageDAGScheduleris the scheduling half of that design. Concretely, it:DAGSchedulerimplementation,ConcurrentStageDAGScheduler, selected viaspark.scheduler.dagSchedulerType=ConcurrentStageDAGSchedulerand engaged per-job via thestreaming.concurrent.stages.enabled=trueproperty.(gated by
spark.scheduler.realtimeModeSlotsCheck.disabled, with a newCONCURRENT_SCHEDULER_INSUFFICIENT_SLOTerror class for the failure path). The DAG walk accumulates into a localset and only commits to scheduler state after the slot check passes, so a failed submission can't leak partial state.
invariant that
DAGScheduleronly sees "all of a stage's parents are done" task completions, even though tasks are physically running concurrently.DAGSchedulerto make this possible: one emptyonFinalStageCreatedhook, two package-private accessors (isRunningStage,getStage), andrelaxes
submitStage,markStageAsFinished,submitMissingTasks, andactiveJobForStagefromprivatetoprotected. All hooks are no-ops for the default scheduler.TaskSchedulerImpl(TaskSets with concurrent stages getmaxTaskFailures=1, since a streaming task failure must restart the query rather thansilently retry against a still-running shuffle) and
TaskSetManager(ExecutorLostFailurecounts towardmaxTaskFailuresinstead of being exempted, so executor loss propagates as aquery failure rather than a silent stall).
sc.conf.get(SPECULATION_ENABLED)— reject concurrent-stage jobs with speculation enabled, matching how the rest ofcore reads the config.
DAGSchedulerSuiteinto an abstractDAGSchedulerSuiteBase+TestDAGSchedulertrait so the new suite can reuse the existing scheduler test harness without duplicating it.Why are the changes needed?
Real-time mode is the only execution model in which a Structured Streaming query can deliver sub-100ms end-to-end latency, and concurrent stage scheduling is a hard prerequisite for it.
Here's why the default scheduler can't deliver that on its own:
Sequential stage execution is the latency floor for streaming. The default
DAGSchedulerwaits for stage N to complete — every task done, every byte written to shuffle storage, everymap output registered with the
MapOutputTracker— before submitting stage N+1. For a typical streaming query with a source, a stateful operator, and a sink, that means eachmicro-batch's latency is the sum of each stage's processing time plus the sum of each shuffle's serialization/deserialization cost. Even with small per-stage costs, the sum dominates
as queries get more complex, and there's no architectural way to reduce it within the existing scheduler.
Real-time mode pipes data between stages via a streaming shuffle, not a materialized one. Downstream tasks subscribe to upstream tasks' output as it's produced — there's no "stage N
is done, here are the map outputs" handoff. For that to work, all stages of the job must be running simultaneously when records start flowing. If stage N+1 isn't running yet, stage N has
no consumer for the records it produces and either drops them or blocks. So "schedule all stages concurrently" isn't an optimization for real-time mode — it's a correctness requirement of
the streaming shuffle.
Failure semantics also have to change. In batch mode, a task failure caused by an executor crash is exempted from the failure count because the executor's loss isn't the task's fault
and the framework can re-run the task elsewhere. In real-time mode that exemption is wrong: the streaming shuffle has in-flight records that can't be reconstructed, so an executor loss
must fail the query and let it restart from a checkpoint. Similarly, retrying a single task against a streaming shuffle that's already partially consumed would corrupt state — so
concurrent-stage TaskSets are capped at
maxFailures=1.The default scheduler must stay untouched for batch. Real-time mode is opt-in and additive — the cluster still needs to run batch and non-real-time streaming jobs with their existing
semantics. Hence the scheduler-type config, the per-job opt-in property, and the empty-by-default
onFinalStageCreatedhook on the baseDAGScheduler: when nothing opts in, nothingchanges.
Does this PR introduce any user-facing change?
No user-facing behavior change for any existing workload. Without setting the new config,
SparkContextbuilds the sameDAGSchedulerit always has, and the default scheduler'sbehavior is unchanged.
The PR does introduce two new internal configs (both
internal(), so not part of the public surface):spark.scheduler.dagSchedulerType— chooses theDAGSchedulerimplementation. Defaults to"DAGScheduler".spark.scheduler.realtimeModeSlotsCheck.disabled— skips the slot-availability check used by the concurrent scheduler. Defaults tofalse.And one new error class:
CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT— thrown by the concurrent scheduler when a streaming job needs more concurrent slots than the cluster offers.How was this patch tested?
Added one new test suite plus targeted regression tests in the existing TaskScheduler suites:
ConcurrentStageDAGSchedulerSuite— exercises the new scheduler end-to-end through the existingDAGSchedulerSuiteBasetest harness. Tests cover:runningStageson submission; the child's task completions are buffered until the parent finishes). Complexsix-stage DAG with diamond dependencies (verifies parent-tracking, deferred-event buffering, and correct release order when parents finish out of order). Concurrent stages disabled in
properties (scheduler falls back to default sequential behavior).
dependentStageMap(the parent isn't marked finished by the cascade, so the cleanup at the end of
markStageAsFinishedis the only path that can release the entry). Job cancellation cleans up bothconcurrentStagesanddependentStageMap. Executor-loss-induced abort (via themaxFailures=1path) cleans up both maps.SparkConf. Both cause the job to fail on submission with a clear error.extraEmptyCheckshook onassertDataStructuresEmptyis overridden to assert bothconcurrentStagesanddependentStageMapare empty, and is called fromafterEach, soevery locally-defined test and every inherited test from
DAGSchedulerSuiteBase(149 of them) automatically validates the new state invariants. Total: 155 tests pass in this suite.By inheriting from
DAGSchedulerSuiteBase, the suite also runs all 149 existingDAGSchedulertests againstConcurrentStageDAGScheduler— free regression coverage that the newscheduler behaves identically to
DAGSchedulerwhen concurrent mode is not engaged.TaskSchedulerImplSuite— one new test: aTaskSetwithstreaming.concurrent.stages.enabled=trueis submitted withmaxTaskFailures=1regardless ofspark.task.maxFailures; aregular
TaskSetstill gets the cluster default. Regression-guards both branches of the new conditional.TaskSetManagerSuite— two new tests covering the new failure-counting behavior:ExecutorLostFailurewithexitCausedByApp=falsecounts towardmaxTaskFailures(the query restarts rather than silently absorbing executorloss).
Full run:
core/testOnly *DAGSchedulerSuite *ConcurrentStageDAGSchedulerSuite *TaskSetManagerSuite *TaskSchedulerImplSuite→ 489 tests, all pass.Was this patch authored or co-authored using generative AI tooling?
Co-authored with Claude Code (Claude Opus 4.7)